{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Resilience against hardware failures\n",
    "\n",
    "Scenario:  We have a cluster that partially consists of preemptible ressources.  That is, we'll have to deal with workers suddenly being shut down during computation.  While demonstrated here with a `LocalCluster`, Dask's resilience against preempted ressources is most useful with, e.g., [Dask Kubernetes](https://kubernetes.dask.org/) or [Dask Jobqueue](https://jobqueue.dask.org).\n",
    "\n",
    "Relevant docs: <http://distributed.dask.org/en/latest/resilience.html#hardware-failures>"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Increase resilience\n",
    "\n",
    "Whenever a worker shuts down, the scheduler will increment the suspiciousness counter of _all_ tasks that were assigned (not necessarily computing) to the worker in question.  Whenever the suspiciousness of a task exceeds a certain threshold (3 by default), the task will be considered broken.  We want to compute many tasks on only a few workers with workers shutting down randomly.  So we expect the suspiciousness of all tasks to grow rapidly.  Let's increase the threshold:"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import dask\n",
    "\n",
    "dask.config.set({'distributed.scheduler.allowed-failures': 100});"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## All other imports"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "from dask.distributed import Client, LocalCluster\n",
    "from dask import bag as db\n",
    "import os\n",
    "import random\n",
    "from time import sleep"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## A cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "cluster = LocalCluster(threads_per_worker=1, n_workers=4, memory_limit=400e6)\n",
    "client = Client(cluster)\n",
    "client"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## A simple workload\n",
    "\n",
    "We'll multiply a range of numbers by two, add some sleep to simulate some real work, and then reduce the whole sequence of doubled numbers by summing them."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def multiply_by_two(x):\n",
    "    sleep(0.02)\n",
    "    return 2 * x"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "N = 400\n",
    "\n",
    "x = db.from_sequence(range(N), npartitions=N // 2)\n",
    "\n",
    "mults = x.map(multiply_by_two)\n",
    "\n",
    "summed = mults.sum()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Suddenly shutting down workers"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Let's mark two worker process id's as non-preemptible."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "all_current_workers = [w.pid for w in cluster.scheduler.workers.values()]\n",
    "non_preemptible_workers = all_current_workers[:2]"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def kill_a_worker():\n",
    "    preemptible_workers = [\n",
    "        w.pid for w in cluster.scheduler.workers.values()\n",
    "        if w.pid not in non_preemptible_workers]\n",
    "    if preemptible_workers:\n",
    "        os.kill(random.choice(preemptible_workers), 15)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Start the computation and keep shutting down workers while it's running"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "summed = client.compute(summed)\n",
    "\n",
    "while not summed.done():\n",
    "    kill_a_worker()\n",
    "    sleep(3.0)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Check if results match"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "print(f\"`sum(range({N}))` on cluster: {summed.result()}\\t(should be {N * (N-1)})\")"
   ]
  }
 ],
 "metadata": {
  "anaconda-cloud": {},
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
